package com.atzc.sink;



import com.atzc.bean.HomeLogBean;
import com.atzc.utils.mysql.MysqlConnect;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;



public class HomeLogSinkMysql extends RichSinkFunction<HomeLogBean> {

    private PreparedStatement ps = null;
    private Connection connection = MysqlConnect.getConnection();

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        try {
            String sql = "insert into home_log_new( create_time,requestid,token,`schema`,citycode,os,osversion,appversion,imei,publiclongitude,publiclatitude,publiccitycode,devicename,publictoken,create_date,import_date,update_time,mac,oaid,appname,appchannelid,androidid,starttime,endtime,memno,meminfocache,h5platform ) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
            ps = connection.prepareStatement(sql);
        }catch (SQLException e){
            System.out.println("error:首页日志编译sql出错了");
        }

    }

    @Override
    public void invoke(HomeLogBean homeLogBean, Context context) throws Exception {
        //设置字段
        try {
            ps.setString(1,homeLogBean.getCreate_time());
            ps.setString(2,homeLogBean.getRequestid());
            ps.setString(3,homeLogBean.getToken());
            ps.setString(4,homeLogBean.getSchema());
            ps.setString(5,homeLogBean.getCitycode());
            ps.setString(6,homeLogBean.getOs());
            ps.setString(7,homeLogBean.getOsversion());
            ps.setString(8,homeLogBean.getAppversion());
            ps.setString(9,homeLogBean.getImei());
            ps.setString(10,homeLogBean.getPubliclongitude());
            ps.setString(11,homeLogBean.getPubliclatitude());
            ps.setString(12,homeLogBean.getPubliccitycode());
            ps.setString(13,homeLogBean.getDevicename());
            ps.setString(14,homeLogBean.getPublictoken());
            ps.setString(15,homeLogBean.getCreate_date());
            ps.setString(16,homeLogBean.getImport_date());
            ps.setString(17,homeLogBean.getUpdate_time());
            ps.setString(18,homeLogBean.getMac());
            ps.setString(19,homeLogBean.getOaid());
            ps.setString(20,homeLogBean.getAppname());
            ps.setString(21,homeLogBean.getAppchannelid());
            ps.setString(22,homeLogBean.getAndroidid());
            ps.setString(23,homeLogBean.getStarttime());
            ps.setString(24,homeLogBean.getEndtime());
            ps.setString(25,homeLogBean.getMemno());
            ps.setString(26,homeLogBean.getMeminfocache());
            ps.setString(27,homeLogBean.getH5platform());
        }catch (Exception e){
            System.out.println("error:首页日志插入数据出错了");
        }



        // 执行写入
        ps.execute();
    }


}
